/*
 *  Openmysee
 *
 *  This program is free software; you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License as published by
 *  the Free Software Foundation; either version 2 of the License, or
 *  (at your option) any later version.
 *
 *  This program is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *  GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License
 *  along with this program; if not, write to the Free Software
 *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 *
 */
				 
#include "echo.h"

#define TYPE_NP			0
#define TYPE_CP			1
#define TYPE_SCP		2
#define TYPE_ECP		3
#define TYPE_GCP		4

#define MEDIATYPE_FIRST		0.149

#define MAX_IDLE		90	/* destruct a connection to sp/np after 90s */
//#define P2PS_PORT		50002
#define TS4CP_PORT		22168

#define REQUEST_AHEAD		15	/* Request how many blocks if a block doesn't exist in cache */

int MAX_P2PS=500;
int MAX_P2PC=512;
int MAX_JOB_PER_SESSION=3;

//char *CONFIG = "./acp.cfg";
char *PREFIX="/data/cp/";
char *SCP_CHANNEL;
char *ECP_REGION;
int isGCP;
char *SERVERIP;
char *BINDIP;
char *PIDFile="/var/run/cpnew.pid";
int MAX_BANDWIDTH;

char *AUTH_MD5;
int  AUTH_USERID;

fd_set osocks;
int BINDALL;
#ifdef HAVE_TS
int TSSOCK = 0;
//#define RANDOM_PORT	3947
#endif
time_t CurrentTime;
time_t startTime;
int SnapShotInterval;
int NearPeerInterval = 30;

int isSet = 0;					/* whether TS has returned WELCOME message */
struct TSMessage UDPMsg;

extern struct Channel *ChannelHash[MAX_CHANNEL];
extern struct Channel *ChannelList;

struct sockaddr_in TSADDR;
socklen_t addrlen = sizeof (struct sockaddr_in);

// calculate avg & cur speed
long long totalDownBytes=0, totalUpBytes=0;
long long tmpDownBytes=0, tmpUpBytes=0;

struct ServerDesc TRACKER[MAX_TYPE];

char *LOGXML;

extern int errno;

int JobHighWater = 10000;
int MaxNPPerChannel = 300;
int cfgP2PS_PORT = 23;
int cfgCP2TS_PORT = CP2TS_PORT;

struct NamVal ConfigParameters[]
= 
{
{"Prefix", &PREFIX, 's'},
{"MAX_NP", &MAX_P2PS, 'd'},
{"MAX_SP", &MAX_P2PC, 'd'},
{"Pidfile", &PIDFile, 's'},
{"TrackerIP", &SERVERIP, 's'},
{"SCP", &SCP_CHANNEL, 's'},
{"GCP", &isGCP, 'd'},
{"authid", &AUTH_USERID, 'd'},
{"authmd5", &AUTH_MD5, 's'},
{"ECP", &ECP_REGION, 's'},
{"BandWidth", &MAX_BANDWIDTH, 'd'},
{"BindIP", &BINDIP, 's'},
{"SnapShotInterval",&SnapShotInterval,'d'},
{"CP4NP_PORT", &cfgP2PS_PORT, 'd'},
{"CP2TS_PORT", &cfgCP2TS_PORT, 'd'},
{"LogFilePath", &LOGXML, 's'},
{"NearPeerInterval", &NearPeerInterval, 'd'},
{"JobHighWater", &JobHighWater, 'd'},
{"MaxNPPerChannel", &MaxNPPerChannel, 'd'},
{"BINDALL", &BINDALL, 'd'}
};

int register_cp ();
int init_cp ();
int handle_new_connection(int sock, int type);
int Clientclosure (int listnum, int type);
void process_child (void);
int init_P2PS (int listnum);
int process_P2PS (int listnum);
int closure_P2PS (int listnum);
int init_P2PC (int listnum);
int process_P2PC (int listnum);
int closure_P2PC (int listnum);
int sendMessage (int sock, char *ptr, struct sockaddr_in *dest);
int process_TS2CP_PEERS (char *buf);
void process_type (int type, fd_set *socks, fd_set *wsocks, fd_set *esocks);
int reconnect (struct Session *p);
char *parseECP (char *str, char *buf);
int closure_TS ();
int periodCheck (float KBPSused);
void makeSnapShot(int count, int time_interval);
int send_nearpeers (struct Channel *pc, struct Edge *pme);
int send_nearpeers_toall (struct Channel *pc);
#ifdef HAVE_TS
int process_TS();
#endif
int send_P2P_PUSHLIST (struct Channel *pc, int id);
int period_process (void);
extern char *getJobBuffer (struct JobDes *p, int *max);
extern inline void setblockId (struct JobDes *pj, int id);

#include "sessions.c"
#define INIT_MAXQ(pc,s,maxq)		do\
{\
	if (s->maxBlockID == 0) return -1;\
	if (s->maxBlockID >= MAX_QUEUE) maxq = MAX_QUEUE;\
	else maxq = s->maxBlockID;\
	if (pc->pcinfo->indisk == NULL)\
	{\
		pc->pcinfo->max_queue = maxq;\
		pc->pcinfo->indisk = calloc (maxq, 1);\
		pc->pcinfo->bitflag = calloc ((maxq+7)/8, 1);\
		if (pc->pcinfo->indisk == NULL || pc->pcinfo->bitflag == NULL)\
			return -1;\
	} else if (maxq < pc->pcinfo->max_queue)\
	{\
		pc->pcinfo->max_queue = maxq;\
		pc->pcinfo->indisk = realloc (pc->pcinfo->indisk, maxq);\
		pc->pcinfo->bitflag = realloc (pc->pcinfo->bitflag, (maxq+7)/8);\
		if (pc->pcinfo->indisk == NULL || pc->pcinfo->bitflag == NULL)\
			return -1;\
	}\
} while (0)

int period_process (void)
{
	static time_t last_snapshot;
	static int snapCount = 0;

	if (CurrentTime - last_snapshot > SnapShotInterval)
	{
		makeSnapShot(snapCount++, CurrentTime-last_snapshot);
		system("/usr/bin/vmstat -a >> cp.log 2>&1 &");
		last_snapshot = CurrentTime;
	}
	return 0;
}
		
int main(int argc, char **argv)
{
	int i, mode = 1, tmp = 0;

	if (argc < 2)
	{
		printf("usage: %s mode(0 for daemon, 1 for console).\n", argv[0]);
		return -1;
	}
	signal (SIGPIPE, SIG_IGN);
	signal (SIGINT, terminate);

	mode = atoi (argv[1]);

	if (mode == 0)
		daemon(1,1);

	read_config (CONFIG, ConfigParameters, sizeof(ConfigParameters)/sizeof(struct NamVal));

	for(i = 0 ; i < argc; ++i)
	{
		if(strncmp(argv[i], "tcp=", 4) == 0)
		{
			tmp = atoi(argv[i]+4);
			if(tmp > 0 && tmp < 65535)
				cfgP2PS_PORT = tmp;
		}
		else if(strncmp(argv[i], "udp=", 4) == 0)
		{
			tmp = atoi (argv[i]+4);
			if(tmp > 0 && tmp < 65535)
				cfgCP2TS_PORT = tmp;

		}
	}

	for (i=0; i<10 && IN_LOOP > 0; i++)
	{
		/*
		pid_t pid;
		if ((pid = fork ()) == 0)
		{
		*/
			FD_ZERO(&osocks);
			if (init_cp () < 0)// || initLOG () < 0)
			{
				PDEBUG ("init_cp error, exit...\n");
				exit (-1);
			}
			process_child ();
		/*
		} else if (pid < 0)
		{
			perror ("fork");
			exit (pid);
		} else
		{
			waitpid (pid, NULL, 0);
		}
		*/
	}
	return 0;
}

int init_P2PS (int listnum)
{
	return 0;
}

int process_P2P_HELLO (struct Session *p, struct Message *m)
{
	struct Edge *pedge = NULL;
	float version = *(float *)(m->buffer);
	char *buf = m->buffer + sizeof (float);
	struct Channel *pc;
	struct LiveChannelInfo *pcinfo;
	int listnum;
//	char *buf, buffer[MAX_DATA];

	PINFO ("RECV P2P_HELLO. \n");
	p->version = version;
	listnum = p - TRACKER[TYPE_P2PS].head;
	if ((pc = findChannel (buf, MD5_LEN)) == NULL)
	{
		if ((pc = newLiveChannel (buf, NULL, buf, 0, 0)) != (struct Channel *)0)
		{
			pedge=newEdge (pc, p);
			pc->numclient ++;
		} else
		{
			Clientclosure (listnum, TYPE_P2PS);
			return -1;
		}
		p->pc = pc;
	} else
	{
		if (pc->numclient > MaxNPPerChannel)
		{
			Clientclosure (listnum, TYPE_P2PS);
			return -1;
		}
		for (pedge=p->header; pedge && pedge->head != pc; pedge=pedge->enext);
		if (pedge == NULL)
		{
			pedge=newEdge (pc, p);
			pc->numclient ++;
		}
		p->pc = pc;
		if (pc->pcinfo->dataSource) 
		{
			/*
			// connect exist, send p2p_hello to check channel state.
			buf = buffer+sizeof (int);
			*(unsigned char *)buf = P2P_HELLO;
			buf += sizeof (char);
			memcpy (buf, pc->channel_md5, MD5_LEN);
			buf += MD5_LEN;
			*(unsigned char *)buf = 0;
			buf += sizeof (char);
			*(int *)buffer = buf - buffer;
			if (writeMessage (pc->pcinfo->dataSource, buffer) < 0)
			{
				Clientclosure (listnum, TYPE_P2PC);
				return -1;
			}
			PDEBUG("sent P2P_HELLO to SP. \n");
			*/
			buf += MD5_LEN + sizeof (char);
			memcpy (&(p->addr), buf, sizeof (p->addr));
			if (pedge) send_nearpeers (pc, pedge);
			return 0;
		}
		PDEBUG("NO connection to SP. try connect.\n");
	}
	buf += MD5_LEN + sizeof (char);
	memcpy (&(p->addr), buf, sizeof (p->addr));
	buf += sizeof (p->addr);

	pcinfo = pc->pcinfo;
	pcinfo->numofsp = (unsigned char)*buf;
	buf += sizeof (char);
	if (pcinfo->numofsp == 0 || pcinfo->numofsp > MAX_REPSP)
	{
		Clientclosure (listnum, TYPE_P2PS);
		return -1;
	}
	memcpy (&(pcinfo->SPLIST), buf, pcinfo->numofsp*sizeof(struct NormalAddress));
	buf += pcinfo->numofsp*sizeof (struct NormalAddress);
	if (buf - m->buffer + NORMAL_HEADER > m->len)
	{
		PDEBUG ("Invalid message %d, length %d not enough\n", m->type, m->len);
		Clientclosure (listnum, TYPE_P2PS);
		return -1;
	}
	if (isGCP || SCP_CHANNEL)
	{
		if ((listnum = newChannel (pc, pcinfo->SPLIST, pcinfo->numofsp, 0)) < 0)
		{
			PDEBUG("Unable connect to SP.\n");
			Clientclosure (p-TRACKER[TYPE_P2PS].head, TYPE_P2PS);
			return -1;
		}
	} else	/* ECP, need peers */
	{
#ifdef HAVE_TS
		UDPMsg.type = CP2TS_NEED_PEERS;
		UDPMsg.len = 12+MD5_LEN;
		memcpy (UDPMsg.buffer, pc->channel_md5, MD5_LEN);
/* isSet whether TS has returned WELCOME message */
		if (isSet && sendMessage (TSSOCK, (char *)&UDPMsg, &TSADDR) < 0)
		{
			PDEBUG ("exit...\n");
			exit (1);
		}
#endif
	}
	if (pedge) send_nearpeers (pc, pedge);
	return 0;
}

int process_P2P_MEDIATYPE (int listnum, struct Message *m)
{
	struct Channel *pc;
	char *buf, *media, *name, *channel_name;
	int start, length, size, proglen, chnllen;

	buf = m->buffer;
	if ((pc = findChannel (buf, MD5_LEN)) == NULL)
		return -1;
	buf += MD5_LEN;
	start = *(int *)buf;
	buf += sizeof (int);
	length = *(int *)buf;
	buf += sizeof (int);
	size = *(int *)buf;
	buf += sizeof (int);
	media = buf;
	buf += size;
	proglen = *(unsigned char *)buf;
	buf += sizeof (char);
	name = buf;
	buf += proglen;
	buf += sizeof (int);
	chnllen = *(unsigned char *)buf;
	buf += sizeof (char);
	channel_name = buf;
	buf += chnllen;
	if (buf - (char *)m > m->len)
		return -1;
	addMedia (pc, start, length, size, media, name, channel_name);
	return 0;
}

int process_P2P_PUSHLIST (struct Session *p, struct Message *m)
{
	struct Channel *pc;
	struct Edge *pedge;
	char *buf;
	int i, type, listnum, size;

	listnum = p - TRACKER[TYPE_P2PS].head;
	buf = m->buffer;
	if (p->npcp == TYPE_CP)
	{
		if ((pc = findChannel (buf, MD5_LEN)) == NULL)
		{
			if ((pc = newLiveChannel (m->buffer, NULL, m->buffer, 0, 0)) != (struct Channel *)0)
			{
				pedge=newEdge (pc, p);
				pc->numclient ++;
			}
		} else
		{
			for (pedge=p->header; pedge && pedge->head != pc; pedge=pedge->enext);
			if (pedge == NULL)
			{
				pedge=newEdge (pc, p);
				pc->numclient ++;
			}
		}
		buf += MD5_LEN;
	} else
	{
		pc = p->pc;
	}

	if (pc == NULL && p->npcp != TYPE_CP)
	{
		Clientclosure (listnum, TYPE_P2PS);
		return -1;
	}
	if (p->numjob >= MAX_JOB_PER_SESSION)
		return -2;

	type = *(unsigned char *)buf;
	buf += sizeof (char);
	size = *(unsigned char *)buf;
	buf += sizeof (char);
	if (type)
	{
		deleteChannel (p, pc);
		for (i=0; i<size; i++)
			if (process_P2P_REQUEST_real (p, pc, ((unsigned int *)buf)[i]) < 0)
				return -1;
	} else
	{
		for (i=0; i<size; i++)
			if (process_P2P_REQUEST_real (p, pc, ((unsigned int *)buf)[i]) < 0)
				return -1;
		buf += size*sizeof(int);
		size = *(unsigned char *)buf;
		buf += sizeof (char);
		deleteJob (p, pc, (unsigned int *)buf, size);
	}
	if (buf - m->buffer + NORMAL_HEADER > m->len)
	{
		PDEBUG ("Invalid message %d, length %d not enough\n", m->type, m->len);
		Clientclosure (listnum, TYPE_P2PS);
		return -1;
	}

	return 0;
}

int process_P2P_REQUEST_real (struct Session *p, struct Channel *pc, int id)
{
	struct JobDes *pj = newJob ();
	char *buf, *buffer;
	struct LiveChannelInfo *pcinfo;
	int size, max, i;

	buffer = getJobBuffer (pj, &max);
	buf = buffer + sizeof (int);
	*(unsigned char *)buf = P2P_RESPONSE;
	buf += sizeof (char);
	if (p->npcp == TYPE_CP)
	{
		memcpy (buf, pc->channel_md5, MD5_LEN);
		buf += MD5_LEN;
	}
	pcinfo = pc->pcinfo;
	if (p->first == 0)
	{
		p->first ++;
		if (p->version >= MEDIATYPE_FIRST)
			sendIdMedia (p, pc, id, 0);
	}
	if (pcinfo->dataSource == NULL)
	{
		*(int *)buf = id;
		buf += sizeof (int);
		*(int *)buf = 0;
		buf += sizeof (int);
		if (isGCP || SCP_CHANNEL)
		{
			if ((i = newChannel (pc, pcinfo->SPLIST, pcinfo->numofsp, 0)) < 0)
				freeLiveChannel (pc, NULL);
		} else
		{
#ifdef HAVE_TS
			UDPMsg.type = CP2TS_NEED_PEERS;
			UDPMsg.len = 12+MD5_LEN;
			memcpy (UDPMsg.buffer, pc->channel_md5, MD5_LEN);
/* isSet whether TS has returned WELCOME message */
			if (isSet && sendMessage (TSSOCK, (char *)&UDPMsg, &TSADDR) < 0)
			{
				PDEBUG ("exit...\n");
				exit (1);
			}
#endif
		}
	} else if ((size=locate_by_id (pc, id, buf, max-32)) > 0)
	{
		buf += 2*sizeof (int) + size;
		if (p->version >= MEDIATYPE_FIRST && (i=isHit (pc, id)) >= 0)
			sendHitMedia (p, pc, i, id, 0);
		p->last_transferblock = CurrentTime;
	} else if (size == -2)
	{
		assert (0);
		PDEBUG ("Leave block %d to next round.\n", id);
		return -1;
	} else if (id >= 0)
	{
		*(int *)buf = id;
		buf += sizeof (int);
		*(int *)buf = 0;
		buf += sizeof (int);
		PINFO ("no block %d\n", id);
		send_P2P_PUSHLIST (pc, id);
	}
	*(int *)buffer = buf - buffer;
	setblockId(pj, id);
	writeDATAMessage(p,pc, pj);
//	PDEBUG ("Write block %d\n", id);
	return 0;
}

int process_P2PS (int listnum)
{
	struct Session *p = &(TRACKER[TYPE_P2PS].head[listnum]);
	struct Message *m = (struct Message *)(p->buf+p->start);

	tmpDownBytes += m->len;

	switch (m->type)
	{
		case P2P_HELLO:
			if (process_P2P_HELLO (p, m) == -2)
				return -2;
			break;
		case P2P_PUSHLIST:
			if (process_P2P_PUSHLIST (p, m) == -2)
				return -2;
			break;
		case P2P_REPORT:	/* At present no action */
			break;
		case P2P_MSG:
			break;
		case P2P_SPUPDATE:
			break;
		case P2P_RESPONSE:
			break;
		case P2P_NEAR_PEERS:
			break;
		case P2P_REQMEDIA:
			sendIdMedia (p, p->pc, *(int *)(m->buffer), 0);
			break;
		default:
			PDEBUG ("Unknown message format from client\n");
			Clientclosure (listnum, TYPE_P2PS);
			return -1;
	}
	return 0;
}

int closure_P2PS (int listnum)
{
	struct Session *p = &(TRACKER[TYPE_P2PS].head[listnum]);
//	struct Channel *pc = p->pc;
	struct Edge *pedge, *prevedge;
	PDEBUG ("CP disconnected from %d.%d.%d.%d:%d.\n", IPADDR (p->host), p->port);
	for (pedge=p->header; pedge; pedge=prevedge)
	{
		pedge->head->numclient --;
		prevedge=pedge->enext;
		delEdge (pedge);
	}
	FD_CLR(p->socket, &osocks);
	close (p->socket);
	FREE (p->buf);
	deleteAll (p);
	memset (p, 0, sizeof (struct Session));
	return 0;
}


int init_P2PC (int listnum)
{
	return 0;
}


int newChannel (struct Channel *pc, struct NormalAddress *client, int n, int flag)
{
	struct Session *p;
	int listnum, newconn = -1, max, sock_flag;
	struct Session *head;
	struct sockaddr_in addr;
	char *buf, buffer[MAX_DATA];

	if (pc == NULL) return -1;
	head = TRACKER[TYPE_P2PC].head;
	max = TRACKER[TYPE_P2PC].max;
	for (listnum = 0; listnum < max; listnum ++)
	{
		if(head[listnum].socket == 0)
		{
			if ((newconn = socket (PF_INET, SOCK_STREAM, 0)) < 0)
			{
				perror ("socket||gethostbyname");
				return -1;
			}
			memset (&addr, 0, sizeof (addr));
			addr.sin_port = client[0].sin_port;
			addr.sin_addr = client[0].sin_addr;
			addr.sin_family = AF_INET;
			if ((sock_flag = connect_nonb(newconn, &addr, sizeof (addr))) == -1)
			{
				close (newconn);
				return -1;
			}
			head[listnum].socket = newconn;
			head[listnum].type = TYPE_P2PC;
			head[listnum].flag = 1;
			head[listnum].sock_flag = sock_flag;
			head[listnum].buf = NEW();
			head[listnum].pc = pc;
			head[listnum].time_sec = CurrentTime;
			head[listnum].totalup = 0;
			head[listnum].last_transferblock = CurrentTime;
			FD_SET(newconn, &osocks);
			if (listnum > TRACKER[TYPE_P2PC].maxid)
				TRACKER[TYPE_P2PC].maxid = listnum;
			break;
		}
	}
	if (listnum >= max)
	{
		PDEBUG ("no space left for new incoming client.");
		close (newconn);
		return -1;
	}
	TRACKER[TYPE_P2PC].cur ++;
	(*(TRACKER[TYPE_P2PC].init)) (listnum);
	p = &(TRACKER[TYPE_P2PC].head[listnum]);
	pc->pcinfo->dataSource = p;
	pc->upsize = 0;
	pc->downsize = 0;

	PDEBUG("Connect to %s:%d. and send P2P_HELLO.\n", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));


	buf = buffer+sizeof (int);
	*(unsigned char *)buf = P2P_HELLO;
	buf += sizeof (char);
	memcpy (buf, pc->channel_md5, MD5_LEN);
	buf += MD5_LEN;
	*(unsigned char *)buf = 0;
	buf += sizeof (char);
	if (flag == TYPE_CP)
	{
		*(unsigned char *)buf = pc->pcinfo->numofsp;
		buf += sizeof (char);
		if (pc->pcinfo->numofsp)
		{
			memcpy (buf, &(pc->pcinfo->SPLIST), pc->pcinfo->numofsp*sizeof (struct NormalAddress));
			buf += pc->pcinfo->numofsp*sizeof (struct NormalAddress);
		}
	}
	*(int *)buffer = buf - buffer;
	if (writeMessage (p, pc, buffer) < 0)
	{
		perror ("CP: write SP");
		Clientclosure (listnum, TYPE_P2PC);
		return -1;
	}
	return listnum;
}


int process_P2P_SPUPDATE (int listnum, struct Message *m)
{
	int maxq;
	struct Edge *pedge;
	char buffer[MAX_DATA];
	struct Channel *pc;
	struct SPUpdate *s = (struct SPUpdate *)(m->buffer+MD5_LEN);
	int bShouldCloseSP = 0;

	if (m->len < sizeof (struct SPUpdate) + MD5_LEN + 5)
	{
		PDEBUG ("Invalid message %d, length %d not enough\n", m->type, m->len);
		Clientclosure (listnum, TYPE_P2PC);
		return -1;
	}
	*(int *)buffer = sizeof (struct SPUpdate) + sizeof(int) + sizeof (char);
	*(unsigned char *)(buffer + sizeof (int)) = P2P_SPUPDATE;
	PINFO ("Recv SPUPDATE(%lld,%lld,%u,%u).\n", s->minKeySample, s->maxKeySample, s->minBlockID, s->maxBlockID);
	memcpy (buffer+sizeof(int)+sizeof(char), s, sizeof(struct SPUpdate));

	if ((pc=findChannel (m->buffer, MD5_LEN)) == NULL) return -1;

	if (s->minBlockID == 0xffffffff && s->maxBlockID == 0xffffffff)
	{
		pc->pcinfo->dataSource = NULL;	/* indication of end */
		if(s->minKeySample == -1 && s->maxKeySample == -1)
			PDEBUG("NO SUCH RESOURCE!\n"); // no such resource
		else if(s->minKeySample == 0 && s->maxKeySample == 0)
			PDEBUG("CHANNEL HAS BEEN CLOSED!\n");// channel has been closed
		else
			PDEBUG("UNKNOWN MESSAGE! 1\n"); // unknown message
		bShouldCloseSP = 1; // should close connection
	} else
	{
		if(s->minBlockID == 0 && s->maxBlockID == 0 && s->minKeySample == 0 && s->maxKeySample == 0)
		{
			PDEBUG("END OF CHANNEL!\n"); // end of channel
			bShouldCloseSP = 1;
		} else if (s->minKeySample == -1ULL && s->maxKeySample == -1ULL)
		{
			INIT_MAXQ(pc,s,maxq);
			return 0;
		} else if (s->minKeySample == -2LL)
		{
			INIT_MAXQ(pc,s,maxq);
			pc->type = T_PLIST;
			pc->pcinfo->max_channel = (int)(s->maxKeySample);
			if (pc->pcinfo->max_channel <= 0 || pc->pcinfo->max_channel >= MAX_FILEINPUT)
				return -1;
			if (pc->pcinfo->media != NULL)
				freeMedia (pc);
			pc->pcinfo->media = calloc (sizeof (struct MediaData), pc->pcinfo->max_channel);
			return 0;
		} else if (s->minKeySample == -3LL)
		{
			pc->pcinfo->max_channel = 1;
			if (pc->pcinfo->media != NULL)
				freeMedia (pc);
			pc->pcinfo->media = calloc (sizeof (struct MediaData), 1);
		} else
		{
			memcpy (&(pc->pcinfo->s), s, sizeof (struct SPUpdate));
			// request block after spupdate, not wait!
			// now, block will be sent automaticlly by SP
			//	send_P2P_PUSHLIST (pc, s->maxBlockID);
		}
	}
	{
		int i = 0;
		unsigned char vcode = 0;
		for (i = 0; i < sizeof (struct SPUpdate); ++i) {
			vcode += ((unsigned char*)s)[i];
		}
		buffer[*(int*)buffer] = vcode;
		++*(int*)buffer;
	}
	for (pedge=pc->PeerHead; pedge; pedge=pedge->cnext)
	{
		if (pedge->me->npcp == TYPE_CP)
		{
//			PDEBUG ("Send SPUPDATE to CP %d.\n", pedge->me-TRACKER[TYPE_P2PC].head);
			writeMessage (pedge->me, pc, (char *)m);
		} else
		{
//			PDEBUG ("Send SPUPDATE to NP %d.\n", pedge->me-TRACKER[TYPE_P2PS].head);
			writeMessage (pedge->me, pc, buffer);
		}
	}
	if(bShouldCloseSP != 0 || pedge == pc->PeerHead/* no NP*/)
			return -1; // Close Connection to SP
	return 0;
}

int process_P2P_RESPONSE (int listnum, struct Message *m)
{
	int size;
	struct Session *p = &(TRACKER[TYPE_P2PC].head[listnum]);
	struct Channel *pc;
	char *msg = m->buffer+MD5_LEN;

	
	if ((pc=findChannel (m->buffer, MD5_LEN)) == NULL)
		return -1;

 	if ((size = saveBlock (pc, msg, p)) <= 0)
	{
   		PDEBUG ("save block error, size %d, %d\n", size, listnum);
		return -1;
//		Clientclosure (listnum, TYPE_P2PC);
	}
	p->last_transferblock = CurrentTime;
	return 0;
}

int process_P2PC (int listnum)
{
	struct Session *p = &(TRACKER[TYPE_P2PC].head[listnum]);
	struct Message *m = (struct Message *)(p->buf+p->start);

	tmpDownBytes += m->len;

	switch (m->type)
	{
		case P2P_SPUPDATE:
			if(process_P2P_SPUPDATE (listnum, m) < 0)
			{
				Clientclosure(listnum, TYPE_P2PC);
			}
			break;
		case P2P_RESPONSE:
			process_P2P_RESPONSE (listnum, m);
			break;
		case P2P_MSG:
			break;
		case P2P_MEDIATYPE:
			process_P2P_MEDIATYPE (listnum, m);
			break;
		default:
			PDEBUG("Err msg type from SP.\n");
			Clientclosure (listnum, TYPE_P2PC);
			return -1;
	}
	return 0;
}

int closure_P2PC (int listnum)
{
//	struct Edge *pedge, *prevedge;
//	char buffer[MAX_DATA], *buf;
	struct Session *p=&(TRACKER[TYPE_P2PC].head[listnum]);
	struct Channel *pc = p->pc;

	if (pc)
	{
		/*
		buf = buffer + sizeof (int);
		*(unsigned char *)buf = P2P_SPUPDATE;
		buf += sizeof (char);
		memcpy (buf, pc->channel_md5, MD5_LEN);
		buf += MD5_LEN;
		memset (buf, 0, sizeof (struct SPUpdate));
		buf += sizeof (struct SPUpdate);
		*(int *)buffer = buf - buffer;
		for (pedge=pc->PeerHead; pedge; pedge=prevedge)
		{
			pc->numclient --;
			writeMessage (pedge->me, buffer);
			prevedge = pedge->cnext;
			delEdge (pedge);
		}
		*/
		pc->pcinfo->dataSource = NULL;
	}
	PDEBUG ("SP disconnected from %d.%d.%d.%d:%d.\n", IPADDR (p->host), p->port);
	FD_CLR(p->socket, &osocks);
	close (p->socket);
	FREE (p->buf);
	deleteAll (p);
	memset (p, 0, sizeof (struct Session));
	return 0;
}

#ifdef HAVE_TS
int register_cp ()	//send UDP msg
{
	const int max_times = 10;
	int i;
	char *buf;
	char buffer[MAX_DATA];

	isSet = 0;

	if (TSSOCK == 0)
	{
		for (i=0; i<max_times; i++)
		{
			if (BINDALL == 0)
				TSSOCK = init_udp (BINDIP, cfgCP2TS_PORT);
			else
				TSSOCK = init_udp (NULL, cfgCP2TS_PORT);
			if (TSSOCK > 0) break;
			PDEBUG("Sleep 1000. cause init UDP port %d failed.", cfgCP2TS_PORT);
			sleep (1000);
		}
		if (TSSOCK <= 0)
		{
			PDEBUG ("exit...\n");
			exit (1);	//the max times try init_udp failure
		}
	}
	buf = buffer + sizeof (int);
	*(unsigned char *)buf = CP2TS_REGISTER;
	buf += sizeof (char);
	*(int *)buf = AUTH_USERID;
	buf += sizeof (int);
	strncpy (buf, AUTH_MD5, MD5_LEN);
	buf += MD5_LEN;
	*(unsigned short *)buf = htons (cfgP2PS_PORT);
	buf += sizeof (short);
	if (SCP_CHANNEL)	//Now only GCP is available
	{
		*buf = CT_SPECIFIED_RES;
		buf += sizeof (char);
		memcpy (buf, SCP_CHANNEL, strlen(SCP_CHANNEL)+1);
		buf += strlen (SCP_CHANNEL)+1;
	} else if (ECP_REGION)
	{
		*buf = CT_EDGE;
		buf += sizeof (char);
		buf = parseECP (ECP_REGION, buf);
	} else
	{
		*buf = CT_GENERAL;
		buf += sizeof (char);
	}
	*(int *)buffer = buf - buffer;	//the size of buffer;
	TSADDR.sin_port = htons (TS4CP_PORT);
	TSADDR.sin_addr.s_addr = inet_addr (SERVERIP);
	TSADDR.sin_family = AF_INET;
	if (sendMessage (TSSOCK, buffer, &TSADDR) < 0)	//send register msg
	{
		PDEBUG ("Cannot write to server\n");
		return -1;
	}
	return 0;
}
#endif

int init_cp ()
{
	FILE *pidf;
	struct rlimit rl;
	char buffer[MAX_DATA];
	rl.rlim_cur = rl.rlim_max = 1000000;
	if (setrlimit (RLIMIT_NOFILE, &rl) != 0)
	{
		perror ("getrlimit");
	}
	OPENLOG;
#ifdef DEBUG
	system ("ulimit -a");
	if (getrlimit (RLIMIT_CORE, &rl) != 0)
	{
		perror ("getrlimit");
	}
	fprintf (stderr, "Get core limit %d:%d\n", (int)rl.rlim_cur, (int)rl.rlim_max);
	rl.rlim_cur = rl.rlim_max = (rlim_t )102400;
	if (setrlimit (RLIMIT_CORE, &rl) != 0)
	{
		perror ("getrlimit");
	}
	if (getrlimit (RLIMIT_CORE, &rl) != 0)
	{
		perror ("getrlimit");
	}
	fprintf (stderr, "Set core limit to %d:%d\n", (int)rl.rlim_cur, (int)rl.rlim_max);
	system ("ulimit -a");
#endif
#ifdef HAVE_TS
	if (register_cp () < 0)
	{
		PDEBUG ("Cannot init TS connection\n");
		return -1;
	}
#endif
	TRACKER[TYPE_P2PS].flag = FLAG_SERVER;
	TRACKER[TYPE_P2PS].type = TYPE_P2PS;
	TRACKER[TYPE_P2PS].port = cfgP2PS_PORT;
	TRACKER[TYPE_P2PS].cur = 0;
	TRACKER[TYPE_P2PS].max = MAX_P2PS;
	TRACKER[TYPE_P2PS].init = init_P2PS;
	TRACKER[TYPE_P2PS].process = process_P2PS;
	TRACKER[TYPE_P2PS].closure = closure_P2PS;
	TRACKER[TYPE_P2PS].head = calloc (sizeof (struct Session), TRACKER[TYPE_P2PS].max);
	switch (BINDALL)
	{
		case 0:
			if ((TRACKER[TYPE_P2PS].sock = init_server (BINDIP, cfgP2PS_PORT)) < 0)
			return -1;
			break;
		default:
			if ((TRACKER[TYPE_P2PS].sock = init_server (NULL, cfgP2PS_PORT)) < 0)
			return -1;
			break;
	}

	FD_SET(TRACKER[TYPE_P2PS].sock, &osocks);
	TRACKER[TYPE_P2PC].flag = FLAG_CLIENT;
	TRACKER[TYPE_P2PC].type = TYPE_P2PC;
	TRACKER[TYPE_P2PC].port = 0;
	TRACKER[TYPE_P2PC].cur = 0;
	TRACKER[TYPE_P2PC].max = MAX_P2PC;
	TRACKER[TYPE_P2PC].init = init_P2PC;
	TRACKER[TYPE_P2PC].process = process_P2PC;
	TRACKER[TYPE_P2PC].closure = closure_P2PC;
	TRACKER[TYPE_P2PC].head = calloc (sizeof (struct Session), TRACKER[TYPE_P2PC].max);

	mkdir (PREFIX, 0777);
	snprintf (buffer, MAX_DATA, "%s/%s", PREFIX, LIVE_PREFIX);
	mkdir (buffer, 0777);
	snprintf (buffer, MAX_DATA, "rm -fr %s/%s/*", PREFIX, LIVE_PREFIX);
	system (buffer);
	if ((pidf = fopen (PIDFile, "w")) == NULL)
	{
		PDEBUG ("Cannot open pidfile.\n");
		return -1;
	}
	fprintf (pidf, "%d\n", getpid ());
	fclose (pidf);
	return 0;
}

#ifdef HAVE_TS
int process_TS()
{
	struct sockaddr_in dest;
	int addr_len = sizeof (dest);
	struct Message m;
	int i;

	if ((i = recvfrom (TSSOCK, &m, sizeof (struct Message), 0, (struct sockaddr *)&dest, &addr_len)) <= 0)
	{
		PDEBUG ("Error in recving ts message.\n");
		register_cp ();
		return 0;
	}
	switch (m.type)
	{
		case TS2CP_WELCOME:
			memcpy (&UDPMsg, &m, 12);
			PDEBUG("recv WELCOME from TS.\n");
/* isSet whether TS has returned WELCOME message */
			isSet = 1;
			break;
		case TS2CP_PEERS:
			process_TS2CP_PEERS (m.buffer);
			break;
		case TS2CP_MSG:
			if (*(char *)(m.buffer+sizeof(short)))
			{
				PDEBUG ("Error in TS2CP_MSG. \n");
				register_cp ();
			}
			break;
		default:
			PDEBUG ("Error in trackerserver message format\n");
			register_cp ();
			return -1;
	}
	return 0;
}

int closure_TS ()
{
	struct TSMessage *m = &UDPMsg;

	m->type = CP2TS_LOGOUT;
	m->len = 12;
	sendMessage (TSSOCK, (char *)m, &TSADDR);
	return 0;
}

int process_TS2CP_PEERS (char *buf)
{
	struct Channel *pc;
	int listnum;
	char *channel_md5;
	unsigned char cpsize;
	struct NormalAddress *CPlist;
	unsigned char peersize;
	struct PeerInfoWithAddr *pinfo;


	channel_md5 = buf;
	buf += MD5_LEN;
	cpsize = *(unsigned char *)buf;
	buf += sizeof (char);
	CPlist = (struct NormalAddress *)buf;
	buf += sizeof(struct NormalAddress)*cpsize;
	peersize = *(unsigned char *)buf;
	buf += sizeof (char);
	pinfo = (struct PeerInfoWithAddr *)buf;
	// now find the channel
	if ((pc = findChannel (channel_md5, MD5_LEN)) == NULL) return -1;
	if ((listnum = newChannel (pc, CPlist, cpsize, TYPE_CP)) < 0)
		return -1;
	return 0;
}
#endif


char *parseECP (char *str, char *buf)
{
	char  *buffer = buf;
	int flag = -1;
	unsigned char c;
	unsigned char part;
	buf += sizeof (int);
	for (part=0; *str ;str++)
	{
		c = *str;
		switch (c)
		{
			case ':':
				flag = 0;
				break;
			case '.':
				if (flag < 2)
				{
					*(unsigned char *)buf = part;
					buf += sizeof (char);
				}
				part = 0;
				flag ++;
				break;
			default:
				if (c >= '0' && c <= '9')
					part = c;
				break;
		}
	}
	*(int *)buffer = (buf - buffer - sizeof (int))/sizeof(short);
	return buf;
}

int periodCheck (float KBPSused)
{
	struct Session *head;
	int max, listnum;
	struct statistics
	{
		unsigned int resnum;
		unsigned short connnum;
		float bandwidth;
	} stat;
	stat.bandwidth = KBPSused/(MAX_BANDWIDTH*1024)/8;

/* isSet whether TS has returned WELCOME message */
	if (isSet == 0)
	{
		register_cp ();
		return 0;
	}
	max = TRACKER[TYPE_P2PC].maxid + 1;
	head = TRACKER[TYPE_P2PC].head;
	memset (&stat, 0, sizeof(stat));
	for (listnum = 0; listnum < max; listnum++)
	{
		if (head[listnum].socket > 0)
		{
			stat.resnum ++;
			stat.connnum ++;
		}
	}
	max = TRACKER[TYPE_P2PS].maxid + 1;
	head = TRACKER[TYPE_P2PS].head;
	for (listnum = 0; listnum < max; listnum++)
	{
		if (head[listnum].socket > 0)
		{
			if (head[listnum].pc == NULL &&
				head[listnum].header == NULL &&
				CurrentTime - head[listnum].last_transferblock > MAX_TRANSFER_IDLE)
			{
				PDEBUG ("timeout %d from NP %d to %d.\n", listnum, head[listnum].last_transferblock, (int)CurrentTime);
				Clientclosure (listnum, TYPE_P2PS);
			}
			else
				stat.connnum ++;
		}
	}

#ifdef HAVE_TS
	*(unsigned int *)(UDPMsg.buffer) = stat.resnum;
	*(unsigned short *)(UDPMsg.buffer+sizeof(int)) = stat.connnum;
	*(float *)(UDPMsg.buffer+sizeof(int)+sizeof(short)) = stat.bandwidth;
	if(stat.connnum <= MAX_P2PS)
			*(unsigned char *)(UDPMsg.buffer+sizeof(int)+sizeof(short)+sizeof(float)) = 0;
		else
			*(unsigned char *)(UDPMsg.buffer+sizeof(int)+sizeof(short)+sizeof(float)) = 1;
	UDPMsg.len = 23;
	UDPMsg.type = CP2TS_UPDATE;
	if (sendMessage (TSSOCK, (char *)&UDPMsg, &TSADDR) < 0)
	{
		PDEBUG ("exit...\n");
		exit (1);
	}
#endif
	PDEBUG("Res Num: %d. Connection Num: %d. BandWidth Usage: %.4f. \n", stat.resnum, stat.connnum, stat.bandwidth);
	return 0;
}

void makeSnapShot(int count, int time_interval)
{
	time_t tmpTime;
	struct tm result;
	struct Channel *pc, *nextpc;
//	struct Session *ps;
//	struct Edge *pe;
	int cpchannelcount = 0;
	int totalclient = 0;
	long long totalupsize = 0, totaldownsize = 0;
	FILE *f;
	char buffer[MAX_DATA];

	if (time_interval <= 0)
		return;
	localtime_r(&CurrentTime, &result);
	sprintf (buffer, "./cp-%d-%d-%d.log", result.tm_year+1900, result.tm_mon+1, result.tm_mday);
	if ((f = fopen(buffer,"a")) == NULL)
	{
		PDEBUG("Couldn't open cp.log file! \n");
		return;
	}
	fseeko(f, 0, SEEK_END);

	// 1. start CP SnapShot 
	fprintf(f, "\n\n**************Start %d SnapShot of CP, Time : %u/%u %u:%u:%u.*********  \n",count,result.tm_mon+1, result.tm_mday, result.tm_hour, result.tm_min, result.tm_sec);

	// 2. log speed
	fprintf(f, "CP: cur Down %.4f KB. \n", ((float)tmpDownBytes)/1024/time_interval);
	fprintf(f, "CP: cur Up   %.4f KB. \n", ((float)tmpUpBytes)/1024/time_interval);
			
	periodCheck(((float)tmpDownBytes+tmpUpBytes)/1024/time_interval);
			
	totalDownBytes += tmpDownBytes;
	totalUpBytes += tmpUpBytes;
	tmpTime = CurrentTime - startTime; 
	fprintf(f, "CP: avg Down %.4f KB. \n", ((float)totalDownBytes)/1024/tmpTime);
	fprintf(f, "CP: avg Up   %.4f KB. \n", ((float)totalUpBytes)/1024/tmpTime);

	// 3. log channel state
	for (pc=ChannelList; pc; pc=nextpc)
	{
		nextpc = pc->lnext;
		++cpchannelcount;
		totalclient += pc->numclient;
		totalupsize += pc->upsize;
		totaldownsize += pc->downsize;
		fprintf(f,"Channel %s have %d client. Down size %lld, avg speed %f. Up Size %lld, avg speed %f. \n",pc->fname,pc->numclient,pc->downsize, ((float)(pc->downsize)) / time_interval, pc->upsize, ((float)(pc->upsize))/ time_interval);
			/*
			for (pe=pc->PeerHead; pe; pe = pe->cnext)
			{
				// if bitrate < 300kb/s ,then kill it
				if (pe->me->totalup/(CurrentTime - pe->me->time_sec)/1024 < 300)
				fprintf(f,"Session bitrate:%lld .Too slow ! \n",pe->me->totalup/(CurrentTime - pe->me->time_sec));
			}
			*/
		if (pc->numclient == 0)
			freeLiveChannel (pc, NULL);
		else if (CurrentTime - pc->last_nearpeer > NearPeerInterval)
		{
			send_nearpeers_toall (pc);
			pc->last_nearpeer = CurrentTime;
		}

	}
	fprintf(f,"Channel Count : %d. Total client : %d. Total dowsize: %lld. Total upsize %lld \n",cpchannelcount,totalclient,totaldownsize,totalupsize);
	
	fprintf(f,"\n*********************End of SnapShot************************\n");
	fclose(f);
	logto_xml (time_interval, tmpTime, cpchannelcount, totalclient);
	tmpDownBytes = tmpUpBytes = 0;
}

int reconnect (struct Session *p)
{
	struct NormalAddress *client;
	struct sockaddr_in addr;
	if (p->pc == NULL || p->pc->pcinfo->numofsp <= p->flag) return -1;
	
	close (p->socket);
	FD_CLR(p->socket, &osocks);
	if ((p->socket = socket (PF_INET, SOCK_STREAM, 0)) < 0)
	{
		perror ("socket||gethostbyname");
		p->socket = 0;
		return -1;
	}
	client = &(p->pc->pcinfo->SPLIST[p->flag]);
	memset (&addr, 0, sizeof (addr));
	addr.sin_port = client->sin_port;
	addr.sin_addr = client->sin_addr;
	addr.sin_family = AF_INET;
	p->flag ++;
	if ((p->sock_flag = connect_nonb(p->socket, &addr, sizeof (addr))) == -1)
	{
		close (p->socket);
		return -1;
	}
	
	FD_SET(p->socket, &osocks);
	p->time_sec = CurrentTime;
	p->totalup = 0;
	return p->flag;
}

int send_P2P_PUSHLIST (struct Channel *pc, int id)
{
	unsigned char *ptr;
	char buffer[MAX_DATA], *buf;
	struct LiveChannelInfo *pcinfo = pc->pcinfo;
	int i, j;
	
	buf = buffer+sizeof (int);
	*(unsigned char *)buf = P2P_PUSHLIST;
	buf += sizeof (char);
	memcpy (buf, pc->channel_md5, MD5_LEN);
	buf += MD5_LEN;
	*(unsigned char *)buf = 0;
	buf += sizeof (char);
	ptr = buf;
	buf += sizeof (char);
	*ptr = 0;
	for (i=id; (i >= pcinfo->s.minBlockID && i <= pcinfo->s.maxBlockID) && i<id+REQUEST_AHEAD; i++)
	{
		j = i % pcinfo->max_queue;
		if (pcinfo->indisk[j] == (i/pcinfo->max_queue+1) || isSet (pcinfo->bitflag, j))
			continue;
		*(int *)buf = i;
		buf += sizeof (int);
		(*ptr) ++;
		setBit (pcinfo->bitflag, j);
	}
	if (*ptr > 0)
	{
		*(unsigned char *)buf = 0;
		buf += sizeof (char);
		*(int *)buffer = buf - buffer;
		writeMessage (pcinfo->dataSource, pc, buffer);
	}
	return 0;
}

int send_nearpeers (struct Channel *pc, struct Edge *pme)
{
	char buffer[MAX_DATA], *buf, *ptr;
	struct Edge *pedge;
	int i;
	buf = buffer+sizeof (int);
	*(unsigned char *)buf = P2P_NEAR_PEERS;
	buf += sizeof (char);
	ptr = buf;
	buf += sizeof (char);
	for (i=0, pedge=pme->cnext; i<MAX_NEARPEER; pedge=pedge->cnext)
	{
		if (pedge == NULL)
			pedge = pc->PeerHead;
		if (pedge == pme)
			break;
		memcpy (buf, &(pedge->me->addr), sizeof (struct PeerInfoWithAddr));
		buf += sizeof (struct PeerInfoWithAddr);
		i++;
	}
	if (i > 0)
	{
		*(unsigned char *)ptr = i;
		*(int *)buffer = buf - buffer;
		writeMessage (pme->me, pc, buffer);
	}
	return 0;
}

int send_nearpeers_toall (struct Channel *pc)
{
	struct Edge *pedge;
	for (pedge=pc->PeerHead; pedge; pedge=pedge->cnext)
		send_nearpeers (pc, pedge);
	return 0;
}
